跳到主要内容

Java 并发编程-多线程基础使用

Thread 类 和 Runnable接口

Thread class

  • 自定义线程类继承 Thread类
  • 重写 run() 方法,编写线程执行体
  • 创建线程对象,调用 start() 方法启动线程

使用例:下载图片

导入这个包

<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
public class LearnThread extends Thread{

private String url;
private String name;

public LearnThread(String url, String name) {
this.url = url;
this.name = name;
}

@Override
public void run() {
new DownloadWebFile().download(url,name);
System.out.println("下载完成");
}

class DownloadWebFile{
public void download(String url,String name){
try {
FileUtils.copyURLToFile(new URL(url),new File(name));
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
new LearnThread("https://image.alsritter.icu/img/bd_logo1.png","./temp.png").start();
new LearnThread("https://image.alsritter.icu/img/bd_logo1.png","./temp2.png").start();
new LearnThread("https://image.alsritter.icu/img/bd_logo1.png","./temp3.png").start();
}
}

Runnable接口

实际上 Thread 也是继承自这个 Runnable 接口

public
class Thread implements Runnable {
....

那为什么需要添加个 Runnable 接口呢?

主要的原因就是 Java的单继承机制,Thread 是实体类,而 Runnable是一个接口。

而且这个 Runnable 接口是一个函数接口

@FunctionalInterface
public interface Runnable {
public abstract void run();
}

使用步骤

  • 定义 MyRunnable 类去实现 Runnable 接口
  • 实现 run() 方法,编写线程执行体
  • 创建线程对象,调用 Thread类的 start() 方法启动线程

如下,可以直接传递 Runnable

public class LearnRunnable implements Runnable {

@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
System.out.println("等待了"+(i+1));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
new Thread(new LearnRunnable()).start();
}
}

Callable、Future与FutureTask

使用 Runnable 和 Thread 来创建线程有一个弊端,就是 run 方法是没有返回值的。而有时候希望开启一个线程去执行一个任务,并且这个任务执行完成后有一个返回值。

JDK 提供了 Callable 接口与 Future 类来解决这个问题,这也是所谓的 “异步” 模型。

Callable 接口

Callable 与 Runnable 类似,同样是只有一个抽象方法的函数式接口。不同的是,Callable 提供的方法是有返回值的,而且支持泛型。

@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}

Callable 一般是配合线程池工具 ExecutorService 来使用的(看线程池那篇文章)。

public class Temp {
public static void main(String[] args) {
// 使用线程池()
ExecutorService executorService = Executors.newFixedThreadPool(10);
Task task = new Task();
List<Future<Integer>> tasks = new ArrayList<>(); // 这里并没多线程同时操作 List,所以可以直接用 ArrayList

// 把任务存在队列里面
for (int i = 0; i < 20; i++) {
Future<Integer> submit = executorService.submit(task);
tasks.add(submit);
}

// 执行完 tasks 队列里面的任务
tasks.forEach(x -> {
try {
// 调用 get 方法会阻塞当前线程,直到得到结果
System.out.println(x.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});

// 释放线程池
executorService.shutdown();
}

static class Task implements Callable<Integer> {

@Override
public Integer call() throws Exception {
// 模拟计算耗时
TimeUnit.SECONDS.sleep(1);
return new Random().nextInt();
}
}
}

上面的 submit 方法会返回一个 Future,后续的程序可以通过这个 Future 的 get 方法得到结果。

Future接口

上面的 Callable 委托给线程池执行,实际这是一种异步任务,那如何取得异步执行的结果?

这种时候就需要使用 Future对象了,在主线程某个时刻调用 Future对象的 get() 方法,就可以获得异步执行的结果,如果异步任务已经完成,就直接获得结果。如果异步任务还没有完成,那么 get() 会阻塞,直到任务完成后才返回结果。

Future 接口只有几个比较简单的方法:

public abstract interface Future<V> {
// cancel方法是试图取消一个线程的执行,注意是试图取消,并不一定能取消成功。
// 因为任务可能已完成、已取消、或者一些其它因素不能取消,存在取消失败的可能。
// 参数 paramBoolean 表示是否采用中断的方式取消线程执行
public abstract boolean cancel(boolean paramBoolean);

// 是否被取消,不过上面的 cancel 方法的返回值已经是 “是否取消成功”
public abstract boolean isCancelled();
// 判断任务是否完成
public abstract boolean isDone();

// 这两个 get 方法能取得任务的返回值,但是如果任务报错了,这个 get 方法执行后就会抛出那个任务的错误
public abstract V get() throws InterruptedException, ExecutionException;
public abstract V get(long paramLong, TimeUnit paramTimeUnit)
throws InterruptedException, ExecutionException, TimeoutException;
}

有时候,为了让任务有能够取消的功能,就使用 Callable 来代替 Runnable。如果只是为了可取消性而使用又不提供可用的结果,则可以使用 Future<?> 形式类型、并返回 null 作为底层任务的结果。

FutureTask类

参考资料 Java多线程入门类和接口

上面介绍了 Future 接口。这个接口有一个默认实现类叫 FutureTask。FutureTask 直接实现是的 RunnableFuture 接口的,而 RunnableFuture 接口同时继承了 Runnable 接口和 Future 接口:

public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

使用例:

// 自定义 Callable
class Task implements Callable<Integer>{
@Override
public Integer call() throws Exception {
// 模拟计算需要一秒
Thread.sleep(1000);
return 2;
}

public static void main(String args[]) {
// 使用
ExecutorService executor = Executors.newCachedThreadPool();
FutureTask<Integer> futureTask = new FutureTask<>(new Task());

executor.submit(futureTask);
System.out.println(futureTask.get());
}
}

注意这里调用 submit 方法是没有返回值的,因为这里实际上是调用的 submit(Runnable task) 方法,所以这里使用 FutureTask 直接通过 get 取值

线程的常用方法

这里常用的方法直接写在代码里面了,效果看注释

public class Temp {
public static void main(String[] args) {
Runnable runnable = () -> {
// 返回当前线程的引用
Thread thread = Thread.currentThread();

// 设置线程名称
thread.setName("线程名称");
// 获取当线程名称
System.out.println(thread.getName());

try {
// 让当前线程停止执行,把 cpu 让给其他线程执行,但不会释放对象锁和监控的状态,到了指定时间后线程又会自动恢复运行状态
// 注意休眠会抛出异常
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 默认情况下,线程的优先级是5。线程的优先级分为 1~10 等级。
// 优先级高的线程得到的 cpu多。也就是说,两个等待的线程,优先级高的线程容易被 cpu执行。
System.out.println(thread.getPriority());
// 还可以手动设置优先级
thread.setPriority(6);

// 取得线程唯一标识
System.out.println(thread.getId());

};

Thread thread = new Thread(runnable);
// 可以设置当前线程为守护线程
thread.setDaemon(true);
// 判断是否开启成功
if (thread.isDaemon()) {
System.out.println("当前线程为守护线程");
}

thread.start();

// 因为是守护线程,所以这里需要设置一个 while 等待守护线程执行完成
// 判断当前线程是否处于活动状态。活动状态就是已经启动尚未终止。
while (thread.isAlive()) {}
}
}

这里单独讲下面几个特殊的方法

interrupt 方法

参考资料 中断线程

这个方法的作用就如它的名字一样,发送一个中断线程的信号。

举个例子:假设从网络下载一个 100M的文件,如果网速很慢,用户等得不耐烦,就可能在下载过程中点 “取消”,这时,程序就需要中断下载线程的执行。

但是要注意,中断线程信号发送后是否响应不是交由 JVM直接掐了,而是目标线程需要反复检测自身状态是否是 interrupted 状态,如果是,就立刻结束运行。

public class Temp {

public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
int i = 0;
// 这里需要自己去捕获线程是否需要中断,这个机制算是外界与线程通信的一种手段
while (!Thread.currentThread().isInterrupted()) {
System.out.println("当前尚未被中断:" + i);
i++;
}
});

thread.start();
Thread.sleep(1); // 让它执行一毫秒,使之能打印一次

// 注意:这个 interrupt 方法仅仅是发送一个中断信号给线程,至于线程是否能立刻响应,要看线程里面的方法是否去处理
thread.interrupt();
// 因为不是马上线程中断的,所以需要加上个 join 等待线程中断
thread.join();


System.out.println(thread.getState()); // 输出结果为:TERMINATED

// 这里也是有个坑的,线程结束后,状态是 TERMINATED,所以结果是false
// 因此 isInterrupted 一般用于线程里面
if (thread.isInterrupted()) {
System.out.println(thread.getName() + "线程已经中断");
}
}
}

但是对正在睡眠,或者等待状态的线程发送暂停信号会抛出异常

public class Temp {

public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// 因为在捕获到异常后,暂停状态会被重置为 false,所以在抛出异常前还是需要发送信号
Thread.currentThread().interrupt();
throw new RuntimeException("Thread interrupted..." + e);
}
});

thread.start();
Thread.sleep(1);
thread.interrupt();
}
}
Exception in thread "Thread-0" java.lang.RuntimeException: Thread interrupted...java.lang.InterruptedException: sleep interrupted
at test.Temp.lambda$main$0(Temp.java:16)
at java.base/java.lang.Thread.run(Thread.java:830)

yield 方法

暂停当前正在执行的线程对象,释放自己拥有的CPU,让线程进入就绪状态。

// 这个 yield 方法是本地方法
public static native void yield();

yield() 只是使当前线程重新回到可执行状态,所以执行 yield() 的线程有可能在进入到可执行状态后马上又被执行。

yield() 只能使同优先级或更高优先级的线程有执行的机会。

所以:yield() 不会释放锁,只是通知调度器自己可以让出cpu时间片,但只是建议,调度器也不一定采纳

不过一般这个 yield 方法没什么用,硬要说有啥用的话就是让具有相同优先级的其他线程获得运行机会。说来这个文档注释上也说这个 yield 方法没啥用,一般用作测试

join 方法

这个 join 方法有点像 JavaScript 的 await 关键字(当然 JS没有多线程),让 “主线程”(调用 t.join() 的线程) 等待 “子线程” 结束之后才能继续运行,主要的作用就是让线程同步。但是它并不影响同一时刻处在运行状态的其他线程。

join() 方法底层调用的就是 wait() 方法,所以会让出线程

// 主线程
public class Father extends Thread {
public void run() {
Son s = new Son();
s.start();
s.join();
...
}
}

// 子线程
public class Son extends Thread {
public void run() {
...
}
}

但是得在调用了 start() 方法后才有效果

不过实际上这个 join 方法底层也是通过 while 锁来完成的

public final void join() throws InterruptedException {
join(0);
}


public final synchronized void join(final long millis) throws InterruptedException {
if (millis > 0) {
if (isAlive()) {
final long startTime = System.nanoTime();
long delay = millis;

do {
wait(delay);
} while (isAlive() && (delay = millis -
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);
}
} else if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
throw new IllegalArgumentException("timeout value is negative");
}
}

线程组

参考资料 深入浅出Java多线程- 线程组和线程优先级

线程组是什么

Java 中用 ThreadGroup 来表示线程组,可以使用线程组对线程进行批量控制。

ThreadGroup 和 Thread 的关系就如同他们的字面意思一样简单粗暴,每个 Thread 必然存在于一个 ThreadGroup 中,Thread 不能独立于 ThreadGroup 存在。执行 main() 方法线程的名字是 main,如果在 new Thread 时没有显式指定,那么默认将父线程(当前执行 new Thread 的线程)线程组设置为自己的线程组。

public class Demo {
public static void main(String[] args) {
Thread testThread = new Thread(() -> {
System.out.println("当前线程组名字:" +
Thread.currentThread().getThreadGroup().getName());

System.out.println("线程名字:" +
Thread.currentThread().getName());
});

testThread.start();
System.out.println("执行main方法线程名字:" + Thread.currentThread().getName());
}
}

线程组的常用方法

获取当前的线程组名字

Thread.currentThread().getThreadGroup().getName()

复制线程组

// 复制一个线程数组到一个线程组
Thread[] threads = new Thread[threadGroup.activeCount()];
TheadGroup threadGroup = new ThreadGroup();
threadGroup.enumerate(threads);

线程组统一异常处理

public class ThreadGroupDemo {
public static void main(String[] args) {
ThreadGroup threadGroup = new ThreadGroup("group1") {
// 继承 ThreadGroup 并重写下面这个方法,它会捕获未处理的异常
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println(t.getName() + ": " + e.getMessage());
}
};

// 这个线程是 threadGroup 的一员
Thread thread1 = new Thread(threadGroup, new Runnable() {
public void run() {
// 抛出 unchecked 异常
throw new RuntimeException("测试异常");
}
});

thread1.start();
}
}

线程组的数据结构

线程组是 树形结构,所以线程组里面还可以包含其它的线程组

首先看看 ThreadGroup 源码中的成员变量

public class ThreadGroup implements Thread.UncaughtExceptionHandler {
private final ThreadGroup parent; // 父亲ThreadGroup
String name; // ThreadGroup 的名称
int maxPriority; // 线程最大优先级
boolean destroyed; // 是否被销毁
boolean daemon; // 是否守护线程
boolean vmAllowSuspension; // 是否可以中断

int nUnstartedThreads = 0; // 还未启动的线程
int nthreads; // ThreadGroup中线程数目
Thread threads[]; // ThreadGroup中的线程

int ngroups; // 线程组数目
ThreadGroup groups[]; // 线程组数组
}

下面看一下的它的构造方法,对于需要传入其它的线程组可以调用它第二个构造方法 ThreadGroup(ThreadGroup parent, String name)

// 私有构造函数
private ThreadGroup() {
this.name = "system";
this.maxPriority = Thread.MAX_PRIORITY;
this.parent = null;
}

// 默认是以当前ThreadGroup传入作为parent ThreadGroup,新线程组的父线程组是目前正在运行线程的线程组。
public ThreadGroup(String name) {
this(Thread.currentThread().getThreadGroup(), name);
}

// 构造函数
public ThreadGroup(ThreadGroup parent, String name) {
this(checkParentAccess(parent), parent, name);
}

// 私有构造函数,主要的构造函数
private ThreadGroup(Void unused, ThreadGroup parent, String name) {
this.name = name;
this.maxPriority = parent.maxPriority;
this.daemon = parent.daemon;
this.vmAllowSuspension = parent.vmAllowSuspension;
this.parent = parent;
parent.add(this);
}

总结:实现 Runnable 接口和 Callable 接口的区别

Runnable 自 Java 1.0 以来一直存在,但 Callable 在 Java 1.5 中引入,目的就是为了来处理 Runnable 不支持的用例。

Runnable 接口不会返回结果或抛出检查异常,但是 Callable 接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口,这样代码看起来会更加简洁。

补充:工具类 Executors 可以实现 Runnable 对象和 Callable 对象之间的相互转换。

Executors.callable(Runnable task)
// 或
Executors.callable(Runnable task,Object resule)

Runnable 接口

@FunctionalInterface
public interface Runnable {
/**
* 被线程执行,没有返回值也无法抛出异常
*/
public abstract void run();
}

Callable 接口

@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果,或在无法这样做时抛出异常。
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}

总结:执行 execute()方法和 submit()方法的区别是什么呢?

  1. execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
  2. submit() 方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get() 方法来获取返回值,get() 方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit) 方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

就是通过一个回调来取得结果